package natsHelper

import (
	"encoding/json"
	"errors"
	"fmt"
	"gitee.com/lv_baobao/gcore"
	"gitee.com/lv_baobao/gcore/logHelper"
	"time"
)
import "github.com/nats-io/nats.go"

func init() {
}

type NatsMq struct {
	consumer *NatsConsumer
	producer *NatsProducer
}

func NewNatsMq() *NatsMq {
	return &NatsMq{
		consumer: new(NatsConsumer),
		producer: new(NatsProducer),
	}
}

func (n *NatsMq) Subscribe(subject string, cb nats.MsgHandler) {
	n.consumer.Subscribe(subject, cb)
}

func (n *NatsMq) QueueSubscribe(subject string, queue string, cb nats.MsgHandler) {
	n.consumer.QueueSubscribe(subject, queue, cb)
}

func (n *NatsMq) Reply(subject string, fun func() interface{}) {
	n.consumer.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
		res := fun()
		n.producer.Publish(msg.Reply, res)
	})
}

func (n *NatsMq) ReplyWithPara(subject string, fun func(m *nats.Msg) interface{}) {
	n.consumer.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
		defer func() {
			if r := recover(); r != nil {
				n.producer.Publish(msg.Reply, r)
			}
		}()
		res := fun(msg)
		n.producer.Publish(msg.Reply, res)
	})
}

func (n *NatsMq) Publish(subject string, content interface{}) error {
	return n.producer.Publish(subject, content)
}

func (n *NatsMq) Request(subject string, content interface{}, timeout time.Duration, v any) error {
	request, err := n.producer.Request(subject, content, timeout)
	if err != nil {
		return err
	}
	return json.Unmarshal(request.Data, v)
}

type NatsSubscribeInfo struct {
	subject string
	queue   string
	handler nats.MsgHandler
}

type NatsConsumer struct {
	Conn *nats.Conn
	subs map[string]NatsSubscribeInfo
}

func (n *NatsConsumer) tryConnect() error {
	if n.Conn != nil {
		if n.Conn.IsClosed() {
			logHelper.Info("Nats释放上次连接对象")
			n.Conn.Close()
			n.Conn = nil
		} else if n.Conn.IsConnected() {

		} else if n.Conn.IsDraining() {

		} else if n.Conn.IsReconnecting() {

		}
	}
	config := gcore.NewAppSettingsHelper().GetAppConfig()
	logHelper.Info(fmt.Sprintf("Nats尝试连接服务器%s", config.NatsUrl))
	opts := nats.Options{AllowReconnect: false, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second} //订阅不重连，自己实现消费者重连
	opts.Url = config.NatsUrl
	opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
		logHelper.Info("Nats断开", err)
		time.Sleep(time.Second * 3)
		for _, subscribeInfo := range n.subs {
			if subscribeInfo.queue == "" {
				n.Subscribe(subscribeInfo.subject, subscribeInfo.handler)
			} else {
				n.QueueSubscribe(subscribeInfo.subject, subscribeInfo.queue, subscribeInfo.handler)
			}
		}
	}
	conn, err := opts.Connect()
	if err == nil {
		logHelper.Info("连接Nats服务器成功")
	} else {
		logHelper.Error(err)
	}
	n.Conn = conn
	return err
}

func (n *NatsConsumer) Subscribe(subject string, cb nats.MsgHandler) {
	if n.subs == nil {
		n.subs = make(map[string]NatsSubscribeInfo, 0)
	}
	_, existsSub := n.subs[subject]
	if !existsSub {
		n.subs[subject] = NatsSubscribeInfo{subject: subject, handler: cb}
	}
	n.tryConnect()
	if n.Conn == nil {
		logHelper.Error("订阅失败,主题:" + subject)
		time.Sleep(time.Second * 3)
		n.Subscribe(subject, cb)
		return
	}
	_, err := n.Conn.Subscribe(subject, cb)
	if err != nil {
		logHelper.Error("关注主题", subject, err)
		time.Sleep(time.Second * 3)
		n.Subscribe(subject, cb)
	}
}

func (n *NatsConsumer) QueueSubscribe(subject string, queue string, cb nats.MsgHandler) {
	if n.subs == nil {
		n.subs = make(map[string]NatsSubscribeInfo, 0)
	}
	_, existsSub := n.subs[subject]
	if !existsSub {
		n.subs[subject] = NatsSubscribeInfo{subject: subject, queue: queue, handler: cb}
	}
	n.tryConnect()
	if n.Conn == nil {
		logHelper.Error("订阅失败,主题:" + subject)
		time.Sleep(time.Second * 3)
		n.Subscribe(subject, cb)
		return
	}
	_, err := n.Conn.QueueSubscribe(subject, queue, cb)
	if err != nil {
		logHelper.Error("关注主题", subject, err)
		time.Sleep(time.Second * 3)
		n.QueueSubscribe(subject, queue, cb)
	}
}

type NatsProducer struct {
	Conn *nats.Conn
}

func (n *NatsProducer) tryConnect() error {
	if n.Conn != nil {
		logHelper.Info("Nats释放上次连接对象")
		n.Conn.Close()
		n.Conn = nil
	}
	config := gcore.NewAppSettingsHelper().GetAppConfig()
	logHelper.Info("Nats尝试连接服务器")
	opts := nats.Options{AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second}
	opts.Url = config.NatsUrl
	opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
		logHelper.Info("Nats断开", err)
	}
	conn, err := opts.Connect()
	//c, _ := natsHelper.NewEncodedConn(nc, natsHelper.JSON_ENCODER)
	//defer conn.Close()
	n.Conn = conn
	return err
}

func (n *NatsProducer) Publish(subject string, content interface{}) error {
	if n.Conn == nil || n.Conn.IsClosed() {
		if n.tryConnect() != nil {
			return errors.New("连接nats失败")
		}
	}
	if n.Conn != nil && n.Conn.IsReconnecting() {
		return errors.New("重连nats")
	}
	bytes, _ := json.Marshal(content)
	err := n.Conn.Publish(subject, bytes)
	return err
}

func (n *NatsProducer) Request(subject string, content interface{}, timeout time.Duration) (*nats.Msg, error) {
	if n.Conn == nil || n.Conn.IsClosed() {
		if n.tryConnect() != nil {
			return nil, errors.New("连接nats失败")
		}
	}
	if n.Conn != nil && n.Conn.IsReconnecting() {
		return nil, errors.New("重连nats")
	}
	bytes, _ := json.Marshal(content)
	return n.Conn.Request(subject, bytes, timeout)
}
